Reactor 是concurrency-agnostic
,花了一點時間研究這個英文單字的意思,concurrency是我們熟悉的併發,agnostic是未知論者(認為神存在但不為人知也無法確認的),推測的意思是對於Reactor來說並不在意concurrency是存在或是不存在,取決於使用者的行為,希望有高手可以解釋得更精準,Reactor文件上給出的意思是他不會強迫使用併發(concurrency model), 讓開發者自己去決定,如果你需要使用concurrency,Reactor提供了Scheduler
來幫助開發者。
在沒有特別設定的情況下,Flux & Mono並不是特別一個專門的緒(Thread
)去處理,而是根據最後subscribe()
的緒來決定的。在官方的範例中,在main裡面宣告Mono,另開一條Thread
來subscribe
,從印出的結果就可以看出實際上執行緒是根據subscribe
的。
public static void main(String[] args) throws InterruptedException {
System.out.println(Thread.currentThread().getName());
final Mono<String> mono = Mono.just("hello ");
Thread t =
new Thread(
() ->
mono.map(msg -> msg + "thread ")
.subscribe(v ->
System.out.println(
v + Thread.currentThread().getName())
));
t.start();
t.join();
// main
// hello thread Thread-0
}
工具類別Schedulers
提供了幾個靜態方法:
Schedulers.immediate()
:基本上不會用到這個方法,他不會做任何的操作,可以當成是null,有可能的使用場景是某個api需要傳入Schedulers
,但你並不想要更換Thread,這時候就可以傳入Schedulers.immediate()
。Schedulers.single()
:只有一條且重複使用的Thread
。Schedulers.elastic()
:會彈性的增加Thread
(無上限),適用於需要較長時間處理的任務(task),像是呼叫阻斷(blocking)的服務或是I/O,但可能會導致太多的Thread
或是一些backpressure的問題,再推出Schedulers.boundedElastic()
後就不建議使用(Deprecated
)。Schedulers.boundedElastic()
:就像是elastic()
,只是加上了一些限制來避免產生過多Thread
的問題,有一個worker pool,預設閒置60秒就會release Thread
。Schedulers.parallel()
:適用於快速且non-blocking的任務,根據CPU來產生Thread
的數量。fromExecutorService(ExecutorService)
:如果以前有預先就存在的ExecutorService
。fromExecutorService
,都是有一個共用(global),如果希望可以單獨使用可以利用Schedulers.newXXXX()的方式來新增。有一些operator使用指定的Scheduler
來執行,通常也會讓你透過傳入參數的方式來改變,像是Flux.interval(Duration.ofMillis(300))
,每三百毫秒推送,從程式碼可以看出預設使用Schedulers.parallel()
,如果想要自己指定也可以直接傳入指定的Scheduler
。
public static Flux<Long> interval(Duration period) {
return interval(period, Schedulers.parallel());
}
除了以上的Scheduler
之外,其實還隱藏一個VirtualTimeScheduler
,屬於reactor.test.scheduler
,有自己的一個虛擬時鐘來控制時間,隨需的增加,控制時間可能不夠精準,因為只能前進,不能讓時光倒流,這在需要時間流逝才可以測試的場景就會十分好用方便,而不需要真的去等待。
下面這個例子是有一個延遲10秒、每5秒會發送的Flux,正常的情況下是看不到任何結果的,因為main-thread一下子就結束了,根本還來不及。如果想要看到資料就要在main thread加上sleep
來延遲時間,這邊第一個不是剛好十秒是因為會有一點時間差,這樣等待了約二十秒後就可以看到預期的結果。
@Test
void testDefaultScheduler() throws InterruptedException {
List<Long> list = new ArrayList<>();
Flux
.interval(Duration.ofSeconds(10), Duration.ofSeconds(5))
.take(3)
.subscribe(list::add);
Thread.sleep(10500);
System.out.println(list);
Thread.sleep(5000);
System.out.println(list);
Thread.sleep(5000);
System.out.println(list);
/*
[0]
[0, 1]
[0, 1, 2]
*/
}
這時候如果有VirtualTimeScheduler
的幫助,手動加速時間,就可以在正常main跑完的同時,就可以看到結果,節省了不少時間。
@Test
void testVirtualTimeScheduler() throws InterruptedException {
List<Long> list = new ArrayList<>();
VirtualTimeScheduler scheduler = VirtualTimeScheduler.getOrSet();
Flux
.interval(Duration.ofSeconds(10), Duration.ofSeconds(5), scheduler)
.take(3)
.subscribe(list::add);
scheduler.advanceTimeBy(Duration.ofSeconds(10));
System.out.println(list);
scheduler.advanceTimeBy(Duration.ofSeconds(5));
System.out.println(list);
scheduler.advanceTimeBy(Duration.ofSeconds(5));
System.out.println(list);
/*
[0]
[0, 1]
[0, 1, 2]
*/
}
Reactor提供各式的方便的工具讓開發者可以根據使用情境來選擇,下一篇就來介紹到底Reactor是要如何去使用這些Scheduler。隨著文章往後開始比較複雜,如果有寫得不太好的地方希望可以留言討論,感謝!